package com.yxsk.relay.job.admin.core.trigger;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.yxsk.relay.job.admin.core.executor.JobExecutor;
import com.yxsk.relay.job.admin.core.executor.listener.JobExecuteListener;
import com.yxsk.relay.job.admin.core.executor.quartz.context.QuartzRemoteJobExecuteContext;
import com.yxsk.relay.job.admin.core.router.endpoint.EndpointRouterInfo;
import com.yxsk.relay.job.admin.core.router.endpoint.SimpleEndpointRouterInfo;
import com.yxsk.relay.job.admin.core.router.selector.RouterDecisionMaker;
import com.yxsk.relay.job.admin.core.schedule.JobTriggerInfo;
import com.yxsk.relay.job.admin.core.schedule.lifecycle.manage.TaskTriggerManager;
import com.yxsk.relay.job.admin.core.schedule.quartz.context.TriggerContext;
import com.yxsk.relay.job.admin.data.entity.JobPartitionLog;
import com.yxsk.relay.job.admin.data.service.JobPartitionLogService;
import com.yxsk.relay.job.admin.exception.job.JobPartitionException;
import com.yxsk.relay.job.component.admin.utils.SpringBeanUtils;
import com.yxsk.relay.job.component.common.constant.UriConstant;
import com.yxsk.relay.job.component.common.exception.RelayJobRuntimeException;
import com.yxsk.relay.job.component.common.exception.remote.RemoteCallException;
import com.yxsk.relay.job.component.common.protocol.caller.RemoteCaller;
import com.yxsk.relay.job.component.common.protocol.message.base.ResultResponse;
import com.yxsk.relay.job.component.common.protocol.message.partition.JobPartitionRequest;
import com.yxsk.relay.job.component.common.protocol.message.partition.JobPartitionResponse;
import com.yxsk.relay.job.component.common.utils.DateUtils;
import com.yxsk.relay.job.component.common.utils.SerialNoUtils;
import com.yxsk.relay.job.component.common.vo.Endpoint;
import com.yxsk.relay.job.component.common.vo.PartitionInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.util.StringUtils;

import java.text.MessageFormat;
import java.util.*;

/**
 * @Author 11376
 * @CreaTime 2019/7/19 10:38
 * @Description
 */
@Slf4j
public class PartitionDynamicTrigger extends AsyncDynamicTrigger {

    public PartitionDynamicTrigger(TaskTriggerManager taskTriggerManager, TriggerContext context, RouterDecisionMaker routerDecisionMaker) {
        super(taskTriggerManager, context, routerDecisionMaker);
    }

    @Override
    protected JobExecutor getJobExecutor(JobExecuteListener executeListener) {
        return super.getJobExecutor(executeListener);
    }

    @Override
    protected void executeRemoteJob(JobExecutor jobExecutor, List<EndpointRouterInfo> endpoints) {
        // 任务分片
        List<PartitionInfo> partition = partition(endpoints);

        partition.stream().forEach(partitionInfo -> {
            // 实例化任务执行上下文
            QuartzRemoteJobExecuteContext executeContext = new QuartzRemoteJobExecuteContext();
            executeContext.setSerialNo(SerialNoUtils.nextId());
            executeContext.setJobConfig(this.context.getJobConfig());
            JobTriggerInfo triggerInfo = new JobTriggerInfo();
            BeanUtils.copyProperties(this.context.getTriggerInfo(), triggerInfo);
            triggerInfo.setTriggerParam(partitionInfo.getExecuteParam());
            executeContext.setTriggerInfo(triggerInfo);

            SimpleEndpointRouterInfo endpoint = new SimpleEndpointRouterInfo();
            BeanUtils.copyProperties(partitionInfo, endpoint);
            endpoint.setEndpoint(partitionInfo.getEndpoint().clone());
            executeContext.setEndpointRouterInfo(endpoint);

            // 执行
            jobExecutor.execute(executeContext);
        });
    }

    protected List<PartitionInfo> partition(List<EndpointRouterInfo> endpoints) {

        // 组装请求报文
        JobPartitionRequest request = new JobPartitionRequest();
        request.setVersion(UriConstant.VERSION_NO);

        request.setHandleName(this.context.getJobConfig().getHandlerName());
        request.setExecuteParam(this.context.getTriggerInfo().getTriggerParam());
        List<Endpoint> list = new ArrayList<>();
        endpoints.stream().forEach(endpointInfo -> {
            Endpoint endpoint = new Endpoint();
            BeanUtils.copyProperties(endpointInfo.getEndpoint(), endpoint);

            // 获取主机资源信息
            list.add(endpoint);
        });
        request.setEndpoints(list);

        // 取主机列表第一个, 若分片错误继续下一个直到所有主机都失败
        Iterator<EndpointRouterInfo> iterator = endpoints.iterator();
        while (iterator.hasNext()) {
            EndpointRouterInfo endpointRouterInfo = iterator.next();
            try {
                request.setSerialNo(SerialNoUtils.nextId());
                request.setRequestTime(DateUtils.getCurrentDate());
                return this.partitionCall(endpointRouterInfo, request);
            } catch (JobPartitionException | RemoteCallException e) {
                log.error(MessageFormat.format("Fragment request exception, ip: {0}, port: {1}", endpointRouterInfo.getEndpoint().getHost(), endpointRouterInfo.getEndpoint().getPort()), e);
            }
        }

        throw new RelayJobRuntimeException("Task fragmentation error");
    }

    private List<PartitionInfo> partitionCall(EndpointRouterInfo endpointRouterInfo, JobPartitionRequest request) throws JobPartitionException, RemoteCallException {
        // 记录
        JobPartitionLog partitionLog = new JobPartitionLog();
        partitionLog.setId(request.getSerialNo());
        partitionLog.setTriggerLogId(this.context.getTriggerInfo().getTriggerId());
        Endpoint endpoint = endpointRouterInfo.getEndpoint();
        partitionLog.setEndpointIp(endpoint.getHost());
        partitionLog.setEndpointPort(endpoint.getPort());
        partitionLog.setAuthToken(endpoint.getAuthToken());
        partitionLog.setNetProtocol(endpoint.getProtocol().getProtocol());
        partitionLog.setPartitionParam(request.getExecuteParam());
        partitionLog.setEndpointList(convertEndpointList(request));
        partitionLog.setPartitionParam(request.getExecuteParam());
        partitionLog.setBeginTime(DateUtils.getCurrentDate());
        partitionLog.setStatus(JobPartitionLog.PartitionStatusEnum.EXECUTING.getStatus());

        // 添加
        JobPartitionLogService partitionLogService = SpringBeanUtils.getBean(JobPartitionLogService.class);
        partitionLogService.addEntity(partitionLog);

        try {
            RemoteCaller caller = this.getRemoteCaller();
            // 请求头
            Map<String, String> header = new HashMap<>(3);
            if (StringUtils.hasLength(endpointRouterInfo.getEndpoint().getAuthToken())) {
                header.put(UriConstant.AUTHORIZATION_HEADER_KEY, endpoint.getAuthToken());
            }
            ResultResponse<JobPartitionResponse> response = caller.call(request, header, caller.buildUri(endpoint.getHost(), endpoint.getPort(), UriConstant.TASK_PARTITION_URI), new TypeReference<ResultResponse<JobPartitionResponse>>() {
            });
            if (response != null) {
                if (ResultResponse.isOk(response)) {
                    List<PartitionInfo> partitionInfoList = response.getData().getPartitionInfoList();
                    partitionLog.setPartitionResult(JSONObject.toJSONString(partitionInfoList));
                    partitionLog.setStatus(JobPartitionLog.PartitionStatusEnum.SUCCESS.getStatus());
                    return partitionInfoList;
                } else {
                    partitionLog.setErrorMessage(response.getMessage());
                    partitionLog.setStatus(JobPartitionLog.PartitionStatusEnum.FAILED.getStatus());
                    throw new JobPartitionException(response.getMessage());
                }
            }
            String errorMsg = "Partition response is empty.";
            partitionLog.setErrorMessage(errorMsg);
            partitionLog.setStatus(JobPartitionLog.PartitionStatusEnum.FAILED.getStatus());

            throw new JobPartitionException(errorMsg);
        } finally {
            partitionLog.setEndTime(DateUtils.getCurrentDate());
            partitionLogService.updateById(partitionLog);
        }
    }

    private String convertEndpointList(JobPartitionRequest request) {
        return JSONObject.toJSONString(request.getEndpoints());
    }

    protected RemoteCaller getRemoteCaller() {
        return SpringBeanUtils.getBean(RemoteCaller.class);
    }

}
